package org.hope.lee.consumer.transaction;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;

import java.io.UnsupportedEncodingException;
import java.util.List;

/**
 *   
 *  @ProjectName: base-project 
 *  @Description: 事务消费端
 *  @date: 2017/3/29  
 */
public class ConsumerTransaction {
    public ConsumerTransaction() {
        String group_name = "transaction_consumer";
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
        consumer.setNamesrvAddr("192.168.31.165:9876;192.168.31.176:9876");
        try {
            consumer.subscribe("TopicTransaction", "*");
            consumer.registerMessageListener(new Listener());
            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }

    }

    class Listener implements MessageListenerConcurrently{

        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {

                try {
                    for(MessageExt msg : list) {
                        String topic = msg.getTopic();
                        String msgBody = new String(msg.getBody(), "utf-8");
                        String tags = msg.getTags();
                        System.out.println("收到消息:" + "topic:" + topic + ", tags:" + tags + ",msg:" + msgBody);
                        msg.getTags();
                    }
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    }


    public static void main(String[] args) {
        ConsumerTransaction c = new ConsumerTransaction();
        System.out.println("transaction consumer start......");
    }
}
